{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "7.0\n",
      "5.0\n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "# 一个关于队列操作的程序，这里并不涉及多线程。\n",
    "tf.InteractiveSession()\n",
    "\n",
    "q = tf.FIFOQueue(2, \"float\")\n",
    "init = q.enqueue_many(([3,6],))\n",
    "\n",
    "x = q.dequeue()\n",
    "y = x+1\n",
    "q_inc = q.enqueue([y])\n",
    "\n",
    "init.run()\n",
    "q_inc.run()\n",
    "q_inc.run()\n",
    "q_inc.run()\n",
    "print(x.eval())  # 返回1\n",
    "print(x.eval())  # 返回2\n",
    "# x.eval()  # 卡住"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "tensorflow读取外部文件数据的过程：\n",
    "1. 文件系统——>文件名队列\n",
    "tf.train.string_input_producer(string_tensor,num_epochs=None,shuffle=True,seed=None,capactiy=32,shared_name=None,name=None,cancel_op=None)\n",
    "string_tensor:A 1-Dstring tensor with the strings to produce\n",
    "Returns: A queue with the output strings.A QueueRuner for the Queue is added to the current Graph's QUEUE_RUNNER collection.\n",
    "该函数创建了一个文件名队列，同时在图中当前图中添加了一个QUEUE_RUNNER用来管理读写该队列的线程，它是TF中对操作Queue的线程的封装。\n",
    "在队列被创建成功后，整个系统其实还是处于“停滞状态”，并没有文件名被加入到队列中，此时开始计算，会导致计算单元一直等待，导致整个系统被阻塞。使用tf.train.start_queue_runners之后，队列开始填充，系统不再停滞。\n",
    "也就是说我们在操作队列时，由于读入与读出之间的时间差，我们往往需要使用多线程进行处理，所以为了方便管理执行这些操作的线程，使用QueueRunner，在启动这些线程时，使用create_threads。\n",
    "2. 文件名队列——>内存队列"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "520.0\n",
      "534.0\n",
      "570.0\n",
      "580.0\n",
      "590.0\n",
      "723.0\n",
      "734.0\n",
      "747.0\n",
      "763.0\n",
      "778.0\n",
      "785.0\n",
      "795.0\n",
      "806.0\n",
      "821.0\n",
      "838.0\n",
      "853.0\n",
      "871.0\n",
      "889.0\n",
      "901.0\n",
      "918.0\n"
     ]
    }
   ],
   "source": [
    "import sys\n",
    "# 定义一个队列\n",
    "q = tf.FIFOQueue(10, \"float\") \n",
    "counter = tf.Variable(0.0)  #计数器\n",
    "# 给计数器加一（这是一个操作）\n",
    "increment_op = tf.assign_add(counter, 1.0)\n",
    "# 将计数器加入队列（这是一个入队操作）\n",
    "enqueue_op = q.enqueue(counter)\n",
    "\n",
    "# 显示地创建QueueRunner，管理执行上面两个操作的线程\n",
    "# 用多个线程向队列添加数据\n",
    "# 这里实际创建了4个线程，两个增加计数，两个执行入队\n",
    "qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)\n",
    "\n",
    "# 主线程\n",
    "sess = tf.InteractiveSession()\n",
    "tf.global_variables_initializer().run()\n",
    "# 启动入队线程\n",
    "qr.create_threads(sess, start=True)\n",
    "for i in range(20):\n",
    "    print (sess.run(q.dequeue()))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "值得注意的是，上面的代码在执行过程中，执行计数的线程不断运行，执行入队的线程会先执行10次（因为队列为10），当一部分数据被消费后（这个出队的操作由主线程控制），入队线程继续执行。而当主线程的20个循环走完后，计数和入队的线程并不会停止（其中入队的线程在队列满10个时，处于阻塞状态，而计数不会停止）"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "为了避免上面情况的发生，即一些线程停止但另一些还在运行。tensorflow使用coordinator（协调器）协调线程组的运行状态。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0\n",
      "1\n",
      "2\n",
      "3\n",
      "4\n",
      "5\n",
      "67\n",
      "\n",
      "8\n",
      "9\n",
      "0\n",
      "1\n",
      "2\n",
      "3\n",
      "4\n",
      "5\n",
      "6\n",
      "7\n",
      "98\n",
      "\n",
      "0\n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "import threading, time\n",
    "\n",
    "# 子线程函数\n",
    "def loop(coord, id):\n",
    "    t = 0\n",
    "    while not coord.should_stop():\n",
    "        print(id)\n",
    "        time.sleep(2)\n",
    "        t += 1\n",
    "        # 只有1号线程调用request_stop方法\n",
    "        if (t >= 2 and id == 1):\n",
    "            coord.request_stop()\n",
    "\n",
    "# 主线程\n",
    "coord = tf.train.Coordinator()\n",
    "# 使用Python API创建10个线程\n",
    "threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]\n",
    "\n",
    "# 启动所有线程，并等待线程结束\n",
    "for t in threads: t.start()\n",
    "coord.join(threads)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在上面的程序中，所有线程通过调用协调器的should_stop()方法来判断是否停止线程。可以发现当线程id=1执行到t=2时，将调用协调器的request_stop()方法，将should_stop()设置为true。则其他线程也都终止。主线程会等待所有子线程都停止后结束。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们在前面知道，在操作队列时需要创建线程管理器QUENE_RUNNER，再通过create_threads方法启动线程，这是一种显示的进行队列操作的方式。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n",
      "[array([  0.9855609,  -4.737697 , -20.064404 ,   6.8976574], dtype=float32), 1]\n",
      "[array([ 4.241793 ,  6.925385 , -5.6965413,  5.1872787], dtype=float32), 1]\n",
      "[array([ 15.496039 ,   5.0222635,  23.84017  , -10.769951 ], dtype=float32), 1]\n",
      "[array([  8.540631,  -4.458069, -21.396536,  16.745102], dtype=float32), 1]\n",
      "[array([ -2.627375 ,   5.0943184, -10.00617  ,  17.73765  ], dtype=float32), 1]\n",
      "[array([-11.748537,  25.893698,  10.663585,  -8.087724], dtype=float32), 1]\n",
      "[array([ 4.9214973,  0.9360481,  4.313497 , -2.1026533], dtype=float32), 0]\n",
      "[array([ 9.659984 ,  7.596754 , -5.2508945,  8.074907 ], dtype=float32), 1]\n",
      "[array([11.771369 ,  0.7063998,  2.0989969, -1.6823719], dtype=float32), 1]\n",
      "[array([-0.9249712,  7.794566 ,  5.4953156, -6.9203296], dtype=float32), 1]\n"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "# 10个4维输入向量，每个数取值为1-10之间的随机数\n",
    "data = 10 * np.random.randn(10, 4) + 1\n",
    "# 1000个随机的目标值，值为0或1\n",
    "target = np.random.randint(0, 2, size=10)\n",
    "\n",
    "# 创建Queue，队列中每一项包含一个输入数据和相应的目标值\n",
    "queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])\n",
    "\n",
    "# 批量入列数据（这是一个Operation）\n",
    "enqueue_op = queue.enqueue_many([data, target])\n",
    "# 出列数据（这是一个Tensor定义）\n",
    "data_sample, label_sample = queue.dequeue()\n",
    "\n",
    "# 创建包含4个线程的QueueRunner\n",
    "qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)\n",
    "\n",
    "with tf.Session() as sess:\n",
    "    # 创建Coordinator\n",
    "    coord = tf.train.Coordinator()\n",
    "    # 启动QueueRunner管理的线程\n",
    "    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)\n",
    "    # 主线程，消费100个数据\n",
    "    for step in range(100):\n",
    "        if coord.should_stop():\n",
    "            break\n",
    "        data_batch, label_batch = sess.run([data_sample, label_sample])\n",
    "        print([data_batch,label_batch])\n",
    "    # 主线程计算完成，停止所有采集数据的进程 \n",
    "    coord.request_stop() \n",
    "    #request that the threads stop,after this is called,calle to should_stop() will return True.\n",
    "    coord.join(enqueue_threads)\n",
    "    # Wait for threads to terminate."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "另一种启动线程的方式是采用start_queue_runners方法。在前面我们知道tf.train.string_input_producer在创建一个文件名队列时，将一个隐含的QueueRunner添加到全局图中。由于没有显示地返回QueueRunner来用create_threads启动线程，这里使用tf.train.start_queue_runners方法直接启动tf.GraphKeys.QUEUE_RUNNERS集合中的所有队列线程。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 创建文件名队列filename_queue\n",
    "filename_queue=tf.train.string_input_producer(['stat0.csv','stat1.csv'])  # 这两个文件是自定义的\n",
    "# 创建读取CSV文件的TextLineReader\n",
    "reader = tf.TextLineReader(skip_header_lines=1)\n",
    "# 从文件名队列中取出CSV文件中的一条数据记录value\n",
    "_,value=reader.read(filename_queue)\n",
    "# 设置数据记录的默认值\n",
    "record_defaults = [[1],[1],[1],[1]]\n",
    "# 使用decode_csv方法将数据记录的每个字段都转换为特征张量\n",
    "i_d,age,income,outgo=tf.decode_csv(value,record_defaults=record_defaults)\n",
    "# 将所有特征张量组合在一起形成一条记录\n",
    "features=tf.stack([i_d,age,income,outgo])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[   1   24 2048 1024]\n",
      "[   2   48 4096 2048]\n",
      "[   1   24 2048 1024]\n",
      "[   2   48 4096 2048]\n",
      "[   2   48 4096 2048]\n"
     ]
    }
   ],
   "source": [
    "init_op = tf.group(tf.global_variables_initializer(),tf.local_variables_initializer())\n",
    "with tf.Session() as sess:\n",
    "    sess.run(init_op)\n",
    "    coord = tf.train.Coordinator()\n",
    "    # 启动计算图中所有的队列线程\n",
    "    threads=tf.train.start_queue_runners(coord=coord)\n",
    "    # 主线程，消费100个数据\n",
    "    for _ in range(5):\n",
    "        example=sess.run(features)\n",
    "        print(example)\n",
    "    coord.request_stop()\n",
    "    coord.join(threads)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "     关于TF中Example的定义：\n",
    "    message Example {\n",
    "      Features features = 1;\n",
    "    };\n",
    "    message Features {\n",
    "      // Map from feature name to feature.\n",
    "      map<string, Feature> feature = 1;\n",
    "    };\n",
    "    message Feature {\n",
    "      // Each feature can be exactly one kind.\n",
    "      oneof kind {\n",
    "        BytesList bytes_list = 1;\n",
    "        FloatList float_list = 2;\n",
    "        Int64List int64_list = 3;\n",
    "      }\n",
    "    };\n",
    "    message BytesList {\n",
    "      repeated bytes value = 1;\n",
    "    }\n",
    "    message FloatList {\n",
    "      repeated float value = 1 [packed = true];\n",
    "    }\n",
    "    message Int64List {\n",
    "      repeated int64 value = 1 [packed = true];\n",
    "    }"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "TFRecords文件存储是有结构的序列化字符块，它是Tensorflow推荐的标准文件格式。Tensorflow通过Protocol Buffers定义了TFRecords文件中存储的数据记录及其所含字段的数据结构，它们分别定义在example.proto和feature.proto文件中。\n",
    "一个样例包含一组特征，一组特征由多个特征向量由多个特征向量组成的Python字典构成。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# 创建向TFRecords文件写数据记录的writer\n",
    "writer = tf.python_io.TFRecordWriter('stat.tfrecord')\n",
    "# 两轮循环构造输入样例\n",
    "for i in range(1,3):\n",
    "    # 创建example.proto中定义的样例\n",
    "    example = tf.train.Example(\n",
    "        features=tf.train.Features(\n",
    "            feature={\n",
    "                'id':tf.train.Feature(int64_list=tf.train.Int64List(value=[i])),\n",
    "                'age':tf.train.Feature(int64_list=tf.train.Int64List(value=[i*24])),\n",
    "                'income':tf.train.Feature(float_list=tf.train.FloatList(value=[i*2048.0])),\n",
    "                'outgo':tf.train.Feature(float_list=tf.train.FloatList(value=[i*1024.0]))\n",
    "                }\n",
    "        )\n",
    "    )\n",
    "    # 将样例序列化为字符串后，写入stat,tfrecord文件\n",
    "    writer.write(example.SerializeToString())\n",
    "# 关闭输入流\n",
    "writer.close()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# tf.parse_single_example(serialized,features,name=None,example_names=None)\n",
    "# serialized: A scalar string Tensor, a single serialized Example. \n",
    "# features: A dict mapping feature keys to FixedLenFeature or VarLenFeature values.\n",
    "# Returns:A dict mapping feature keys to Tensor and SparseTensor values.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ReaderReadV2(key=<tf.Tensor 'ReaderReadV2_2:0' shape=() dtype=string>, value=<tf.Tensor 'ReaderReadV2_2:1' shape=() dtype=string>)"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "# 创建文件名队列filename_queue\n",
    "filename_queue = tf.train.string_input_producer(['stat.tfrecord'],num_epochs=2)\n",
    "# num_epochs规定遍历文件几次。\n",
    "# 创建读取TFRecords文件的reader\n",
    "reader = tf.TFRecordReader()\n",
    "# 取出stat.tfrecord文件中的一条序列化的样例serialized_example\n",
    "serialized_example = reader.read(filename_queue)\n",
    "# 将一条序列化的样例转换为其包含的所有特征张量\n",
    "# features = tf.parse_single_example(\n",
    "# serialized_example,\n",
    "# features={\n",
    "#    'id':tf.FixedLenFeature([],tf.int64),\n",
    " #   'age':tf.FixedLenFeature([],tf.int64),\n",
    " #   'income':tf.FixedLenFeature([],tf.float32),\n",
    " #   'outgo':tf.FixedLenFeature([],tf.float32),\n",
    "# })\n",
    "serialized_example"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Thread:[<Thread(QueueRunnerThread-input_producer-input_producer/input_producer_EnqueueMany, started daemon 197652)>, <Thread(QueueRunnerThread-input_producer-close_on_stop, started daemon 214516)>, <Thread(QueueRunnerThread-input_producer_1-input_producer_1/input_producer_1_EnqueueMany, started daemon 214512)>, <Thread(QueueRunnerThread-input_producer_1-close_on_stop, started daemon 204932)>, <Thread(QueueRunnerThread-input_producer_2-input_producer_2/input_producer_2_EnqueueMany, started daemon 209800)>, <Thread(QueueRunnerThread-input_producer_2-close_on_stop, started daemon 191308)>, <Thread(QueueRunnerThread-input_producer_3-input_producer_3/input_producer_3_EnqueueMany, started daemon 213964)>, <Thread(QueueRunnerThread-input_producer_3-close_on_stop, started daemon 207548)>, <Thread(QueueRunnerThread-input_producer_4-input_producer_4/input_producer_4_EnqueueMany, started daemon 215088)>, <Thread(QueueRunnerThread-input_producer_4-close_on_stop, started daemon 207396)>, <Thread(QueueRunnerThread-input_producer_5-input_producer_5/input_producer_5_EnqueueMany, started daemon 193352)>, <Thread(QueueRunnerThread-input_producer_5-close_on_stop, started daemon 195712)>, <Thread(QueueRunnerThread-input_producer_6-input_producer_6/input_producer_6_EnqueueMany, started daemon 214076)>, <Thread(QueueRunnerThread-input_producer_6-close_on_stop, started daemon 174576)>, <Thread(QueueRunnerThread-input_producer_7-input_producer_7/input_producer_7_EnqueueMany, started daemon 213556)>, <Thread(QueueRunnerThread-input_producer_7-close_on_stop, started daemon 214788)>, <Thread(QueueRunnerThread-input_producer_8-input_producer_8/input_producer_8_EnqueueMany, started daemon 206088)>, <Thread(QueueRunnerThread-input_producer_8-close_on_stop, started daemon 210416)>, <Thread(QueueRunnerThread-input_producer_9-input_producer_9/input_producer_9_EnqueueMany, started daemon 212780)>, <Thread(QueueRunnerThread-input_producer_9-close_on_stop, started daemon 207784)>, <Thread(QueueRunnerThread-input_producer_10-input_producer_10/input_producer_10_EnqueueMany, started daemon 208376)>, <Thread(QueueRunnerThread-input_producer_10-close_on_stop, started daemon 200564)>]\n",
      "{'age': 24, 'income': 2048.0, 'outgo': 1024.0, 'id': 1}\n",
      "{'age': 48, 'income': 4096.0, 'outgo': 2048.0, 'id': 2}\n",
      "{'age': 24, 'income': 2048.0, 'outgo': 1024.0, 'id': 1}\n",
      "{'age': 48, 'income': 4096.0, 'outgo': 2048.0, 'id': 2}\n",
      "Catch OutOfRangeError\n",
      "Finish reading\n"
     ]
    }
   ],
   "source": [
    "init_op = tf.group(tf.global_variables_initializer(),tf.local_variables_initializer())\n",
    "# 当使用coordiator管理线程组时，需要先执行tf.local_variables_initializer方法对其初始化\n",
    "with tf.Session() as sess:\n",
    "    # 初始化\n",
    "    sess.run(init_op)\n",
    "    coord =tf.train.Coordinator()\n",
    "    threads = tf.train.start_queue_runners(sess=sess,coord=coord)\n",
    "    # 打印程序的后台线程信息\n",
    "    print('Thread:%s'%threads)\n",
    "    try:\n",
    "        for _ in range(10):   # 由于上文中num_epoches规定为2，也就是只能遍历文件2次，一个文件中包含2条记录，所以只能取出4条记录\n",
    "            if not coord.should_stop():\n",
    "                example=sess.run(features)\n",
    "                print(example)\n",
    "    except tf.errors.OutOfRangeError:\n",
    "        print('Catch OutOfRangeError')\n",
    "    finally:\n",
    "        # 请求停止所有后台线程\n",
    "        coord.request_stop()\n",
    "        print('Finish reading')\n",
    "    # 等待所有后台线程安全退出\n",
    "    coord.join(threads)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
